-
Notifications
You must be signed in to change notification settings - Fork 322
JSONRPCConnection handled in swift instead of DispatchIO #2315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for this PR. Apart from this fixing the Windows thread spinning issue (which is amazing), this looks like a great simplification to me 🎉. I have left a few comments inline.
If we take this, I think we should also switch sendIO
to be backed by FileHandle
instead of DispatchIO
but that can be a follow-up PR to keep this one focused.
} | ||
} | ||
|
||
self.queue.sync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.queue.async
should be sufficient here, it still guarantees that all incoming data is handled in-order and frees up the thread that the readability handler is running on. Or do you think I’m missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm now using async in the last iteration, but I'm not sure I'm doing it right in each particular case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you think we shouldn’t be using async
here? My thought is that with sync
, we are unnecessarily blocking the thread that handles the readabilityHandler
while we are performing work on queue
.
/// Buffer of received bytes that haven't been parsed. | ||
/// | ||
/// Access to this must be be guaranteed to be sequential to avoid data races. Currently, all access are | ||
/// - The `receiveIO` handler: This is synchronized on `queue`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We now have a corresponding use in self.inFD.readabilityHandler
, so we should include that here:
/// - The `inFD.readabilityHandler`: This is synchronized on `queue`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup I'll need to update the docs once this settles
|
||
logger.log("Closing JSONRPCConnection...") | ||
// Attempt to close the reader immediately; we do not need to accept remaining inputs. | ||
receiveIO.close(flags: .stop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we set the readabilityHandler
of inFD
to nil
here to ensure that we don’t receive any more messages? If we do so, I think we also no longer need to set fileHandle.readabilityHandler = nil
if data.isEmpty
in the readabilityHandler
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm doing this now but might be doing it wrong :)
private let sendQueue: DispatchQueue = DispatchQueue(label: "jsonrpc-send-queue", qos: .userInitiated) | ||
|
||
private let receiveIO: DispatchIO | ||
private let inFD: FileHandle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we match the previous naming and call this receiveFD
? inFD
can get confusing if you eg. set up a JSONRPCConnection
from SourceKit-LSP to a BSP server, in which case the receiveFD
will be stdout of the BSP server process and thus receiveFD
wouldn’t be stdin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming follows JSONRPCConnection.init()
args names, are you suggesting we change these as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think changing the argument names JSONRPCConnection.init()
would be my preference. Happy to change the argument names in a follow-up PR though, this PR is delicate enough that we don’t need to litter it with other renames. But I would prefer to name the members in here receiveFD
and sendFD
now already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed this half way through again and then decided that it would be more efficient to modify this PR locally, attaching the diff. The key changes apart from my PR comments are:
- Ensuring we set
readabilityHandler
tonil
whenever we close the connection - Removing
sendQueue
. Now that we no longer haveDispatchIO
that’s backed by queues, I don’t think we need it anymore.
Now, the BIGGGGGG question is how to actually read data in readabilityHandler
. We kind of have three options:
fileHandle.availableData
: As noted in swiftlang/swift-package-manager#8047, this has the potential of hanging when a significant amount of data is sent but also various issues on other platforms (because swift-stress-tester doesn’t even run on Windows and I’m pretty sure this line was written without Windows in mind).fileHandle.read(upToLength: Int.max)
only returns data when the end of the file handle is reached. While this is fine for use cases like the ProcessRunner in swift-stress-tester that run short-lived processes, it is completely unusable for our case where we need to stream datafileHandle.read(upToLength: 1)
: Probably quite inefficient to read one byte at a time, especially since we try and will try and parse the header on every byte that’s read. If we want to consider this, we should do some performance measurements, eg. by recording an editor session in an input mirror file, replaying it and seeing how much time is spent parsing.
What do I think about these options?
- (2) does not work for our use case so we can rule that out.
- (1) would be the best solution but I am very hesitant to switch to this unless we at least identified and fixed the Windows issue, which we have the best grasp on. @bnbarham I suppose you don’t remember what kind of issues you were facing 5 years ago when deciding against
availableData
inProcessRunner.swift
? - (3) seems like the only option that’s left to us, though I definitely don’t like it.
Overall, this analysis gives me pretty bad vibes and it feels like we are quite likely to run into new problems on all platforms and it’s really hard to test these edge cases. At the moment we at least have a situation that we understand, does not hang and generally works without the quirk of spinning one thread on Windows. Maybe it would be worth waiting for swift-subprocess to be available in the toolchain and then directly switch to it (I don’t have any information about when this might be)? I’d like to hear more thoughts on this as well.
My changes
diff --git a/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift b/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift
index c2c1c696..8fb2e491 100644
--- a/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift
+++ b/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift
@@ -45,21 +45,17 @@ public final class JSONRPCConnection: Connection {
/// The message handler that handles requests and notifications sent through this connection.
///
- /// Access to this must be be guaranteed to be sequential to avoid data races. Currently, all access are
- /// - `init`: Reference to `JSONRPCConnection` trivially can't have escaped to other isolation domains yet.
- /// - `start`: Is required to be call in the same serial code region as the initializer, so
- /// `JSONRPCConnection` can't have escaped to other isolation domains yet.
- /// - `deinit`: Can also only trivially be called once.
+ /// - Important: Must only be accessed from `queue`, `init` and `deinit` to avoid data races.
nonisolated(unsafe) private var receiveHandler: MessageHandler?
- /// The queue on which we read the data
+ /// The queue on which we read and send the data
private let queue: DispatchQueue = DispatchQueue(label: "jsonrpc-queue", qos: .userInitiated)
- /// The queue on which we send data.
- private let sendQueue: DispatchQueue = DispatchQueue(label: "jsonrpc-send-queue", qos: .userInitiated)
+ /// The file handle with which the other process sends data to us.
+ private let receiveFD: FileHandle
+ /// The file handle with which we can send data to the other process
+ private let sendFD: FileHandle
- private let receiveIO: DispatchIO
- private let sendIO: DispatchIO
private let messageRegistry: MessageRegistry
/// If non-nil, all input received by this `JSONRPCConnection` will be written to the file handle
@@ -86,7 +82,6 @@ public final class JSONRPCConnection: Connection {
/// Buffer of received bytes that haven't been parsed.
///
/// Access to this must be be guaranteed to be sequential to avoid data races. Currently, all access are
- /// - The `receiveIO` handler: This is synchronized on `queue`.
/// - `requestBufferIsEmpty`: Also synchronized on `queue`.
private nonisolated(unsafe) var requestBuffer: [UInt8] = []
@@ -136,64 +131,8 @@ public final class JSONRPCConnection: Connection {
state = .created
self.messageRegistry = messageRegistry
- let ioGroup = DispatchGroup()
-
- #if os(Windows)
- let rawInFD = dispatch_fd_t(bitPattern: inFD._handle)
- #else
- let rawInFD = inFD.fileDescriptor
- #endif
-
- ioGroup.enter()
- receiveIO = DispatchIO(
- type: .stream,
- fileDescriptor: rawInFD,
- queue: queue,
- cleanupHandler: { (error: Int32) in
- if error != 0 {
- logger.fault("IO error \(error)")
- }
- ioGroup.leave()
- }
- )
-
- #if os(Windows)
- let rawOutFD = dispatch_fd_t(bitPattern: outFD._handle)
- #else
- let rawOutFD = outFD.fileDescriptor
- #endif
-
- ioGroup.enter()
- sendIO = DispatchIO(
- type: .stream,
- fileDescriptor: rawOutFD,
- queue: sendQueue,
- cleanupHandler: { (error: Int32) in
- if error != 0 {
- logger.fault("IO error \(error)")
- }
- ioGroup.leave()
- }
- )
-
- ioGroup.notify(queue: queue) { [weak self] in
- guard let self else { return }
- for outstandingRequest in self.outstandingRequests.values {
- outstandingRequest.replyHandler(LSPResult.failure(ResponseError.internalError("JSON-RPC Connection closed")))
- }
- self.outstandingRequests = [:]
- self.receiveHandler = nil // break retain cycle
- Task {
- await self.closeHandler?()
- }
- }
-
- // We cannot assume the client will send us bytes in packets of any particular size, so set the lower limit to 1.
- receiveIO.setLimit(lowWater: 1)
- receiveIO.setLimit(highWater: Int.max)
-
- sendIO.setLimit(lowWater: 1)
- sendIO.setLimit(highWater: Int.max)
+ self.receiveFD = inFD
+ self.sendFD = outFD
}
/// Creates and starts a `JSONRPCConnection` that connects to a subprocess launched with the specified arguments.
@@ -293,27 +232,20 @@ public final class JSONRPCConnection: Connection {
state = .running
self.receiveHandler = receiveHandler
self.closeHandler = closeHandler
+ }
- receiveIO.read(offset: 0, length: Int.max, queue: queue) { done, data, errorCode in
- guard errorCode == 0 else {
- #if !os(Windows)
- if errorCode != POSIXError.ECANCELED.rawValue {
- logger.fault("IO error reading \(errorCode)")
- }
- #endif
- if done { self.closeAssumingOnQueue() }
- return
- }
-
- if done {
- self.closeAssumingOnQueue()
- return
- }
-
- guard let data = data, !data.isEmpty else {
- return
- }
+ self.receiveFD.readabilityHandler = { fileHandle in
+ let data = orLog("Reading from \(self.name)") { try fileHandle.read(upToCount: 1) } ?? Data()
+ guard !data.isEmpty else {
+ // We have reached the end of `receiveFD`, close the connection. This will also set the `readabilityHandler` of
+ // `receiveFD` to `nil`, breaking the retain cycle.
+ self.close()
+ return
+ }
+ // We can run the actual data handling asynchronously in the background, all we need to ensure is that the data is
+ // handled in-order, which the queue does.
+ self.queue.async {
orLog("Writing input mirror file") {
try self.inputMirrorFile?.write(contentsOf: data)
}
@@ -554,17 +486,17 @@ public final class JSONRPCConnection: Connection {
orLog("Writing output mirror file") {
try outputMirrorFile?.write(contentsOf: dispatchData)
}
- sendIO.write(offset: 0, data: dispatchData, queue: sendQueue) { [weak self] done, _, errorCode in
- if errorCode != 0 {
- logger.fault("IO error sending message \(errorCode)")
- if done, let self {
- // An unrecoverable error occurs on the channel’s file descriptor.
- // Close the connection.
- self.queue.async {
- self.closeAssumingOnQueue()
- }
- }
+ do {
+ try sendFD.write(contentsOf: dispatchData)
+ } catch {
+ logger.fault("IO error sending message \(error.forLogging)")
+ // Match the pattern of `close()` but call `closeAssumingOnQueue` asynchronously to make sure that
+ // `closeAssumingOnQueue` is executed after all data from `receiveFD` has been read.
+ self.receiveFD.readabilityHandler = nil
+ self.queue.async {
+ self.closeAssumingOnQueue()
}
+ return
}
}
@@ -584,7 +516,7 @@ public final class JSONRPCConnection: Connection {
/// If an unrecoverable error occurred on the channel's file descriptor, the connection gets closed.
///
/// - Important: Must be called on `queue`
- func send(_ message: JSONRPCMessage) {
+ private func send(_ message: JSONRPCMessage) {
dispatchPrecondition(condition: .onQueue(queue))
let encoder = JSONEncoder()
@@ -646,7 +578,12 @@ public final class JSONRPCConnection: Connection {
/// The user-provided close handler will be called *asynchronously* when all outstanding I/O
/// operations have completed. No new I/O will be accepted after `close` returns.
public func close() {
- queue.sync { closeAssumingOnQueue() }
+ // Stop reading any more data from `receiveFD`. Scheduling `closeAssumingOnQueue` on `queue` after closing
+ // `receiveFD` ensures that we won't read any more data after `closeAssumingOnQueue`.
+ self.receiveFD.readabilityHandler = nil
+ queue.sync {
+ closeAssumingOnQueue()
+ }
}
/// Close the connection, assuming that the code is already executing on `queue`.
@@ -654,15 +591,25 @@ public final class JSONRPCConnection: Connection {
/// - Important: Must be called on `queue`.
private func closeAssumingOnQueue() {
dispatchPrecondition(condition: .onQueue(queue))
- sendQueue.sync {
- guard state == .running else { return }
- state = .closed
-
- logger.log("Closing JSONRPCConnection...")
- // Attempt to close the reader immediately; we do not need to accept remaining inputs.
- receiveIO.close(flags: .stop)
- // Close the writer after it finishes outstanding work.
- sendIO.close()
+ // Stop receiving any more
+ guard state == .running else { return }
+ state = .closed
+
+ logger.log("Closing JSONRPCConnection...")
+ // Attempt to close the reader immediately; we do not need to accept remaining inputs.
+ // Close the writer after it finishes outstanding work.
+ do {
+ try sendFD.close()
+ } catch {
+ logger.error("Failed to close outFD: \(error.forLogging)")
+ }
+ for outstandingRequest in self.outstandingRequests.values {
+ outstandingRequest.replyHandler(LSPResult.failure(ResponseError.internalError("JSON-RPC Connection closed")))
+ }
+ self.outstandingRequests = [:]
+ self.receiveHandler = nil // break retain cycle
+ Task {
+ await self.closeHandler?()
}
}
@@ -694,13 +641,13 @@ public final class JSONRPCConnection: Connection {
id: RequestID,
reply: @escaping @Sendable (LSPResult<Request.Response>) -> Void
) {
- self.queue.sync {
- guard readyToSend() else {
+ self.queue.async {
+ guard self.readyToSend() else {
reply(.failure(.serverCancelled))
return
}
- outstandingRequests[id] = OutstandingRequest(
+ self.outstandingRequests[id] = OutstandingRequest(
responseType: Request.Response.self,
replyHandler: { anyResult in
let result = anyResult.map { $0 as! Request.Response }
@@ -732,7 +679,7 @@ public final class JSONRPCConnection: Connection {
"""
)
- send(.request(request, id: id))
+ self.send(.request(request, id: id))
return
}
}
if errorCode != POSIXError.ECANCELED.rawValue { | ||
logger.fault("IO error reading \(errorCode)") | ||
self.inFD.readabilityHandler = { fileHandle in | ||
let data = fileHandle.availableData |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I unfortunately can’t remember the example when fileHandle.availableData
has issues but I’ve been bitten by it more than once. I just remember that it’s a very delicate scenario where it misbehaves and it’s a pain to debug when it happens. So, I really want to use read(upToCount: Int.max))
here.
fileHandle.readabilityHandler = nil | ||
self.queue.async { | ||
self.closeAssumingOnQueue() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to return
here. If we have set the readabilityHandler
to nil
and scheduled queue closing, we don’t have any further processing to do, do we?
Also, if data.isEmpty
could then become a guard
statement.
} | ||
} | ||
|
||
self.queue.sync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you think we shouldn’t be using async
here? My thought is that with sync
, we are unnecessarily blocking the thread that handles the readabilityHandler
while we are performing work on queue
.
self.queue.async { | ||
self.closeAssumingOnQueue() | ||
} | ||
sendQueue.sync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to my other sync
vs async
comment, I think we can make this async
as well. The transmission of the data via stdout is asynchronous anyway, so as long as data is kept in-order (which it is using the queue), we should be able to handle sending in the background.
do { | ||
try outFD.close() | ||
} catch { | ||
logger.error("Failed to close outFD: \(error.forLogging)") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be simplified slightly.
do { | |
try outFD.close() | |
} catch { | |
logger.error("Failed to close outFD: \(error.forLogging)") | |
} | |
orLog("Closing outFD") { | |
try outFD.close() | |
} |
private let sendQueue: DispatchQueue = DispatchQueue(label: "jsonrpc-send-queue", qos: .userInitiated) | ||
|
||
private let receiveIO: DispatchIO | ||
private let inFD: FileHandle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think changing the argument names JSONRPCConnection.init()
would be my preference. Happy to change the argument names in a follow-up PR though, this PR is delicate enough that we don’t need to litter it with other renames. But I would prefer to name the members in here receiveFD
and sendFD
now already.
private let sendIO: DispatchIO | ||
private let inFD: FileHandle | ||
private let outFD: FileHandle | ||
let ioGroup: DispatchGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be private
?
|
||
self.ioGroup.enter() | ||
|
||
ioGroup.notify(queue: queue) { [weak self] in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the entire ioGroup
is now just a convoluted way of saying: This should be executed when the connection is closed: This will get executed when ioGroup.leave()
is called, which happens after calling closeAssumingOnQueue
in JSONRPCConnection.close()
and the error handler of JSONRPCConnectin.send
.
So, if we just executed this code inside closeAssumingOnQueue
, I think we could get rid of the entire ioGroup
logic and greatly simplify this.
Incidentally, now that I look at this, we might have been missing a few ioGroup.leave()
before as well.
Thanks Alex, that's a very valid approach - this is still very much a prototype from someone who has little idea what they're doing in Swift!
I think we should broaden this a bit to include options whether we should be using
Yes, I believe this just doesn't work. I switched to it when it was suggested without realizing it would block until EOF, just to discover later on that everything would hang during testing, and it made sense after looking into that. I don't think this works at all.
Even a function call per each byte seems likely to run into performance issues, so I think it's fair to rule this out.
If there is value in doing that, I'm happy to investigate issues caused by
I think this is very fair as long as swift-subprocess doesn't use DispatchIO on Windows. I wasn't aware of this effort and it definitely looks like the better option if available. I suppose the best route might depend on how long it's going to take until this can be integrated - if this is still like a year away then maybe we could try to do something in the mean time - at least it could be platform-specific code for Windows to avoid creating hazards on other platforms. |
I do not :(. I'm fairly sure it was just based on online discussions about availableData/readData |
DispatchIO causes complications on Windows that we don't know how to get around:
https://forums.swift.org/t/sourcekit-lsp-cpu-spinning-and-libdispatch-complications/82472
This avoids the CPU spinning issue in sourcekit-lsp on Windows #1461